Dedalus

Today’s lecture is one of a few on my own group’s ongoing work on declarative programming for distributed systems. This is a bit of self-indulgence, I confess. But I do think it’s interesting, and I hope it will be useful in illuminating ways that database ideas like declarative languages can apply to new domains. At minimum you’ll learn some distributed systems basics, and get a unique lens on that field. Also, I hope, it will give you a perspective on the potential of going deep into an area and sticking with it for a long time.

Time in Distributed Systems

First, we’ll start with some basics from distributed systems. It turns out that distributed systems are not mostly a problem of spatially distributed machines; they are mostly a problem of temporally-distributed machines. That is, problems arise because different machines cannot observe the state of the world at the same time. If you like a good slogan:

Distributed Systems: It’s About Time!

If you’re a bit more pretentious, you can allude to General Relativity and Light Cones and whatnot. (But none of that is necessary or, in my admittedly Physics-poor experience, helpful.)

Lamport time, Happens-Before, Causality

Lamport’s classic Time, Clocks, and the Ordering of Events in a Distributed System (1978)

Clocks

We could track causality dynamically by having every message drag along its antecedent message IDs. Usually rather expensive. Instead we’ll use “clocks” in clever ways.

Lamport Clock: Causality, Guaranteed

A Lamport Clock is achieved by 3 rules: 1. increment local clock before every event 2. attach your local clock value on every message you send 3. on message receipt: a. set your local clock to MAX(local clock, message clock value) + 1; b. increment your local clock

If $e_1$ happens-before $e_2$ then $e_1 < e_2$ in Lamport time. But the converse is not true. That is, Lamport time is a superset of the happens-before relation. It may report false positives of happens-before!

%%{init: {'theme':'forest'}}%%

flowchart LR
subgraph n1["Node A"]
    S12[2]
    style S12 fill:#f00,stroke:#333,stroke-width:4px

end
subgraph n2["Node B"]
    S21[1] --> S12
    S21 --> S22[2] --> S23[3]
    style S23 fill:#f00,stroke:#333,stroke-width:4px
end

Vector Clock: Causality, Captured Exactly

What if we want to track happens-before precisely?

A Vector Clock is achieved by having each node track the highest clock value they’ve heard about from every node. I.e. it’s a hashmap vc[nodeId => max(clock)]. This hashmap is called a vector clock 1. increment vc[localId] before every event 2. attach vc on every message you send 3. on message receipt with vc vc_msg a. increment vc[localId] b. for node in nodes vc[node] = max(vc[node], vc_msg[node])

Vector clocks capture happens-before precisely, at the cost of a hashmap as big as the number of nodes in the system. They allow us to compare two events and correctly decide if they were concurrent, or if one caused the other.

%%{init: {'theme':'forest'}}%%

flowchart LR
subgraph n1["Node A"]
    S12[A:1, B:2]
    style S12 fill:#fff,stroke:#333,stroke-width:4px

end
subgraph n2["Node B"]
    S21[A:0, B:1] --> S12[A:1, B:1]
    S21 --> S22[A:0, B:2] --> S23[A:0, B:3]
    style S23 fill:#fff,stroke:#333,stroke-width:4px
end

Matrix Clock: I Know What you Know (Knew)

Motivation: Consider a protocol where nodes forward received messages on to other nodes. Suppose, for purposes of reliability, you need to save a message $msg$ with a vector clock $vc_{msg}$ as long as there’s some node that may not have heard it yet.

In order to reclaim space safely, to delete $msg$ you’d need to know the vector clock $vc_n$ at every other node $n$, and check that $vc_{msg} < vc_{n}$.

Said differently, you’d like to know (a lower bound on) what other nodes know. Matrix clocks are a way to do this.

A matrix clock is simply a set of vector clocks $vc_n$ for all participants $n$ in the network. We can stamp (some or all) messages with matrix clocks and update them with max much as we did with vector clocks.

Dedalus

Early Days: Declarative Networking

In work on Declarative Networking in the first decade of the 2000’s, my group demonstrated that many networking ideas are well-represented as declarative queries over graphs.

Network protocols deal at their core with computing and maintaining distributed state (e.g., routes, sessions, performance statistics) according to basic information locally available at each node (e.g., neigh- bor tables, link measurements, local clocks) while enforcing constraints such as local routing policies. Recursive query languages like datalog fit well, with some “distributed” extensions.

We went through two language designs:

Both of these languages are similar to Datalog, extended with the idea that some attribute in each predicate is of the type NetworkAddress, and serves as a location specifier: a declarative label that says “this tuple should live at the node in this attribute”. In that way it’s a lot like Gamma or any parallel database: we “shard” the EDB on location specifiers, and “route” the IDB tuples according to the location specifiers. The runtimes for the languages had the responsibility of enforcing this “routing” – they essentially cause the programs to send messages.

We built a few runtimes and showcase applications, including:

Problems

We ran into trouble with Overlog on a number of occasions, where programs did not do “what we expected”.

One problem arose implementing the Symphony distributed hash table. As I remember it, a part of Symphony’s topology is that nodes randomly request “neighbors” to connect to, but if a node already has a threshold number of inbound neighbors it rejects the request. We were seeing weird race conditions where a node would reject a neighbor request even though it had not met the threshold.

We tried to fix this by changing the runtime, but that only broke other programs. We realized that the problem was our language semantics weren’t clear. In particular, something was fishy around issues like aggregation, soft state, and racing messages in the network — i.e. non-monotonic logic meets non-deterministic ordering of messages!

Dedalus: Datalog in Time and Space

Dedalus was an effort to get a distributed language with non-deterministic message times, non-monotonic expressive power, and a clear, formal semantics. We got this right not by inventing a new language, but by leveraging decades of research on Datalog. In fact, Dedalus is literally a subset of $datalog^\neg$.

The key distinction with Overlog/NDlog was to carefully model time, not just space. After all, distributed systems are about time!

Intuitively, we can think of Dedalus as Datalog across space and time. That sounds fancy, but it’s not a lot more than adding a big GROUP BY location, time clause to every rule. This is simple, clean and surprisingly expressive!

Dedalus is nothing more than a few syntactic restrictions on $datalog^\neg$. Namely:

  1. Space/Time Columns: Each predicate has 2 distinguished columns at the far right, typically with the variables L and T: L is a location specifier (i.e. a nodeId), T is a local clock value (time). In essence, we’re saying that every fact in a dedalus program is true at one point in space-time.
    • Note that the location specifier is very much like a hash-partitioning attribute in Gamma. We’re saying that each fact “lives” on a particular node.
    • If we want to replicate a fact, we create two “copies” with different $L$ attributes.
  2. Spatiotemporal Join Restriction: In the rhs of a rule, we require that the same variable is used for location and time in all predicates. In essence we’re saying that two facts join only if they are true at the same time in the same place – i.e. they’re in the same spatiotemporal GROUP, or they “rendezvous in space and time”.

  3. Successor: We assume a successor predicate succ(s, t) whose entries are pairs of positive integers that differ by 1. This will allow us to represent sequential logical time on a local node.

  4. Local Sequentiality: In the head of a rule, if location variable is the same as in the body (a “local” rule), the time is allowed to be either
    1. the same as in the body (deduction), or
    2. the successor of the body variable (induction in time).
  5. Remote Asynchrony: In the lhs of a rule, if location variable is not the same as the location variable in the body (a network message), the time must come from a special choice function:
     head(C1, ..., Cn, L2, T2) :- p1(..., L, T), ..., pk(..., L2,..., L, T),
                                 time(T2),
                                 choice((C1, ..., Cn, L2, L, T), T2).
    
    • time is the domain of legal timestamps, for range restriction
    • think of choice as a non-deterministically-chosen deterministic function that maps from its first argument to its second. That is, it chooses a random value T2 for each tuple in the head, capturing the idea that messages require an arbitrary delay to arrive. (But see the discussion of Causal Rewrites below.)

Syntax Sugar

To make Dedalus look more like datalog, we’ll introduce some syntax sugar.

First, rather than requiring the location specifier to be in the penultimate position, we require its variable to start with #. (There can be only one location specifier per predicate in a rule).

Second, we sugar the time attribute by omitting it in the body (since it’s the same in all predicates), and annotating the head. There are only three cases to consider:

Example: Routing

Here is a little program that does Path-Vector routing, which is used in internet protocols.

// send links to dest end
linkD(Src, #Dest, Cost)@async :- link(#Src,Dest,Cost).

// local path computation, send new paths back to src end
path(#Src,Dest,pSrc,Cost) :- link(#Src,Dest,Cost), Path=f_init(Src,Dest).
path()
path(#Src,Dest,Path,Cost)@async :- linkD(Src,#Nxt,Cost1), path(#Nxt,Dest,Path2,Cost2), Cost=Cost1+Cost2, Path=f_concatPath(Src,Path2).

# prune shortest paths at src end
spCost(#Src,Dest,min<Cost>) :- path(#Src,Dest,Path,Cost). 
shortestPath(#Src,Dest,Path,Cost) :- spCost(#Src,Dest,Cost), path(#Src,Dest,Path,Cost).
?- shortestPath(#Src,Dest,Path,Cost).

Local Time and Storage

The discussion above should make you a little uncomfortable as regards information persisting over time.

At a shallow level, the answers here are “yes”, so we need some better way to talk about persistence.

Well, one definition of the verb persist in Merriam-Webster’s dictionary is:

persist: to be insistent in the repetition or pressing of an utterance (such as a question or an opinion)

We can define persistence in Dedalus by a rule that persists data across time:

p(...)@next :- p(...).

Thanks to the semantics of @next, we know it is the time just after this current time. This says that every fact in p will persist, uninterrupted, from the time it first appears ad infinitum. In old-school AI this is a solution to the frame problem. We’ll call it a persistence rule.

The persistence rule above is nice if we want storage to accrue monotonically. What if we want to delete from storage over time? For any persistent predicate p we can introduce a predicate p_del with the same arity, which lets you declare facts you want deleted from p_del. The we gently rewrite the persitence rule:

p(...)@next :- p(...), !p_del(...).

Note how this works: if you infer a fact is in p_del at time $t$, then it will no longer be in p as of time $t+1$. In essence, p_del breaks the chain of induction that was enforcing persistence of that fact.

Stratification Over Time

Time is what keeps everything from happening at once. — Ray Cummings, The Girl in the Golden Atom, 1922.

As an extension of $datalog^\neg$, Dedalus requires a semantics of negation. We will adopt the notion of stratified negation, with a twist.

For intuition, recall that cyclic dependencies through negation amount to a contradiction,

p(A) :- !p(A)

This says that p(A) both exists and does not exist. We will forbid this in deductive Dedalus.

However, we will allow cycles through negation over time:

p(A)@next :- !p(A)

This says that p(A) toggles in and out of existence across timesteps. There is nothing contradictory about this. It is also easy to check statically on a Dedalus program by labeling edges in the Predicate Dependency Graph with next and async, and only rejecting programs that have PDG cycles that have *negation without next or async.

This should concern us from a semantics perspective, because this program cannot be stratified! But that’s OK. In fact, this is just an example of local stratification in Datalog, and an easy one that we can determine syntactically.

Local Stratification in Datalog & Dedalus

Local Stratification is basically stratification analysis at the level of facts rather than rules.

In general, local stratification cannot be checked statically without knowing the EDB. The test for local stratification is as expensive as evaluation:

  1. Build dependency graph at the level of ground atoms by instantiating the rules.
  2. Program + EDB is locally stratified iff there are no cycles through negation in dependency graph.

It is easy to show that Dedalus provides a locally stratified model, because each fact can depend negatively only on facts from the past, and hence there are no cycles through negation for the instantiated dependency graph!

Causal Rewrite for Dedalus: Lamport Delay

In order to guarantee local stratification in Dedalus, we have to forbid facts from being “sent into the past” (at least if anything depends negatively on them!)

To do this, we will effectively rewrite input programs with a “Lamport clock delay” to ensure that the facts are persisted in a buffer until their intended Lamport-clock arrival time.

Consider a rule p(A,B)@async :- q(A,B). We will rewrite it as follows to use a receive buffer p_wait for tuples “from the future”. Note that we assume p_wait is a new predicate not used elsewhere in the program:

p_wait(A, B, N)@async :- q(A, B)@N.
p_wait(A, B, N)@next :- p_wait(A, B, N)@M, N ≥ M.
p(A, B)@next ← p_wait(A, B, N)@M, N < M.

This makes tuples from the future invisible to the rest of the rules of the program until the program “catches up” to them.

Note that we extended our syntax sugar here: we allowed timestamps to be visible in the body, and copied (“entangled”) into non-timestamp tuples in the head.

A Model Theory for Dedalus

At the beginning I claimed that Dedalus has a clear semantics from Datalog. But which semantics of negation? Well… moderately fancy ones, unfortunately, but they have intuitive explanations.

Who Cares??!

Dedalus provides for distributed programming what Datalog provides for traditional programming: a formal language that can be implemented, analyzed, optimized and made efficient for many programs. It is fully declarative, in the sense that it has a model-theoretic semantics: the program is defined solely by a specification of its visible output (i.e. a model).

With Dedalus, we became able to pursue the agenda started in Declarative Networking on a strong foundation. This has led us to more pragmatic languages including Bloom and Hydroflow.

Frankly, Dedalus is still very “nichey”, as the intersection of Datalog aficionados and Distributed Systems aficionados remains small. Sometimes being in a niche gives you unique powers though! See for example David Chu’s upcoming SIGMOD paper draft version here that can automatically and correctly refactor and shard complex protocols like Paxos to scale them out.

Hydro is our big bet that we can take ideas from Datalog, Gamma, and the like, and use them as a general purpose compiler stack for distributed, elastic programs. Hey, if autoscaling works for SQL, and it works for Datalog, then why not for everything!